{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ETL how to run?\n",
    "> At here we will talk about how to run ETL. There is 2 steps to run ETL.\n",
    "\n",
    "1. prepare config\n",
    "2. put config to ETLPipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 1. prepare config"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from pathlib import Path\n",
    "from dataverse.config import Config \n",
    "from omegaconf import OmegaConf\n",
    "\n",
    "# E = Extract, T = Transform, L = Load\n",
    "main_path = Path(os.path.abspath('../..'))\n",
    "E_path = main_path / \"./dataverse/config/etl/sample/data_ingestion___sampling.yaml\"\n",
    "T_path = main_path / \"./dataverse/config/etl/sample/data_preprocess___dedup.yaml\"\n",
    "L_path = main_path / \"./dataverse/config/etl/sample/data_load___hf_obj.yaml\"\n",
    "\n",
    "E_config = Config.load(E_path)\n",
    "T_config = Config.load(T_path)\n",
    "L_config = Config.load(L_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Extract Config\n",
    "\n",
    "- load fake generation UFL data\n",
    "- sample 10% of total data to reduce the size of dataset\n",
    "- save to parquet `dataverse/sample/sample_ufl.parquet`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: dataverse_etl_sample\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___test___generate_fake_ufl\n",
      "- name: utils___sampling___random\n",
      "  args:\n",
      "    sample_n_or_frac: 0.1\n",
      "- name: data_load___parquet___ufl2parquet\n",
      "  args:\n",
      "    save_path: ./sample/sample_ufl.parquet\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(OmegaConf.to_yaml(E_config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Transform Config\n",
    "\n",
    "- load parquet `./sample/sample_ufl.parquet`\n",
    "- deduplicate by `text` column, 15-gram minhash jaccard similarity\n",
    "- save to parquet `./sample/preprocess_ufl.parquet`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: dataverse_etl_sample\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___parquet___pq2ufl\n",
      "  args:\n",
      "    path:\n",
      "    - ./sample/sample_ufl.parquet\n",
      "- name: deduplication___minhash___lsh_jaccard\n",
      "- name: data_load___parquet___ufl2parquet\n",
      "  args:\n",
      "    save_path: ./sample/preprocess_ufl.parquet\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(OmegaConf.to_yaml(T_config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 Load Config\n",
    "\n",
    "- load parquet `./sample/preprocess_ufl.parquet`\n",
    "- convert to huggingface dataset and return the object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: dataverse_etl_sample\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___parquet___pq2ufl\n",
      "  args:\n",
      "    path:\n",
      "    - ./sample/preprocess_ufl.parquet\n",
      "- name: data_load___huggingface___ufl2hf_obj\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(OmegaConf.to_yaml(L_config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 2. put config to ETLPipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "An error occurred (ExpiredToken) when calling the GetCallerIdentity operation: The security token included in the request is expired\n"
     ]
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[ No AWS Credentials Found] - Failed to set spark conf for S3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/12/14 16:10:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "23/12/14 16:10:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[ No AWS Credentials Found] - Failed to set spark conf for S3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/12/14 16:10:40 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[ No AWS Credentials Found] - Failed to set spark conf for S3\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/12/14 16:10:47 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Downloading and preparing dataset spark/-1156931706 to /root/.cache/huggingface/datasets/spark/-1156931706/0.0.0...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dataset spark downloaded and prepared to /root/.cache/huggingface/datasets/spark/-1156931706/0.0.0. Subsequent calls will reuse this data.\n"
     ]
    }
   ],
   "source": [
    "# raw -> ufl\n",
    "etl_pipeline.run(E_config)\n",
    "\n",
    "# ufl -> dedup -> ufl\n",
    "etl_pipeline.run(T_config)\n",
    "\n",
    "# ufl -> hf_obj\n",
    "spark, dataset = etl_pipeline.run(L_config)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Dataset({\n",
       "    features: ['id', 'meta', 'name', 'text'],\n",
       "    num_rows: 14\n",
       "})"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'id': 'abfabf49-249b-4bbb-93fa-965bf79d2e37',\n",
       " 'meta': '{\"name\": \"Elizabeth Martinez\", \"age\": 45, \"address\": \"446 Leah Ports\\\\nGibsonchester, MA 98492\", \"job\": \"Psychologist, counselling\"}',\n",
       " 'name': 'test_fake_ufl',\n",
       " 'text': 'To wall trip measure picture.\\nClose experience energy professor run show. Career stuff which PM consumer I tax. Ten magazine build economic course another other.'}"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dataset[0]"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.11"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
